You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/07 18:35:54 UTC
[1/3] phoenix git commit: PHOENIX-4173 Ensure that the rebuild fails
if an index that transitions back to disabled while rebuilding
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.2 1fb01af6b -> 5cf07c4ce
PHOENIX-4173 Ensure that the rebuild fails if an index that transitions back to disabled while rebuilding
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3c5e48d9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3c5e48d9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3c5e48d9
Branch: refs/heads/4.x-HBase-1.2
Commit: 3c5e48d9246f44cc39181b9c1cb9b51fb60bdd32
Parents: 1fb01af
Author: James Taylor <ja...@apache.org>
Authored: Wed Sep 6 12:46:34 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Sep 7 11:34:13 2017 -0700
----------------------------------------------------------------------
.../end2end/index/PartialIndexRebuilderIT.java | 151 ++++++++++++++++++-
1 file changed, 143 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c5e48d9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index cacf0fa..067f50f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@@ -30,7 +31,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
@@ -38,10 +39,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PTable;
@@ -634,6 +638,94 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new CountDownLatch(1);
+ private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new CountDownLatch(1);
+
+
+ @Test
+ public void testDisableIndexDuringRebuild() throws Throwable {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+ PTableKey key = new PTableKey(null,fullTableName);
+ final MyClock clock = new MyClock(1000);
+ EnvironmentEdgeManager.injectEdge(clock);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+ clock.time += 100;
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)");
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')");
+ conn.commit();
+ clock.time += 100;
+ try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
+ // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
+ IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
+ clock.time += 100;
+ long disableTime = clock.currentTime();
+ // Set some values while index disabled
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')");
+ conn.commit();
+ clock.time += 100;
+ assertTrue(hasDisabledIndex(metaCache, key));
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')");
+ conn.commit();
+ clock.time += 100;
+ // Will cause partial index rebuilder to be triggered
+ IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
+ final CountDownLatch doneSignal = new CountDownLatch(1);
+ advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
+ // Set some values while index is in INACTIVE state
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
+ conn.commit();
+ doneSignal.await(30, TimeUnit.SECONDS);
+ // Install coprocessor that will simulate an index write failure during index rebuild
+ addWriteFailingCoprocessor(conn,fullIndexName);
+ clock.time += WAIT_AFTER_DISABLED;
+ doneSignal.await(30, TimeUnit.SECONDS);
+ WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
+ // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
+ IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
+ clock.time += 100;
+ disableTime = clock.currentTime();
+ // Set some values while index disabled
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bbbbb', '11','yy')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','cccccc','222','zzz')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ddddddd','3333','zzzz')");
+ conn.commit();
+ clock.time += 100;
+ // Simulates another write failure. Should cause current run of rebuilder to fail and retry again later
+ IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
+ removeWriteFailingCoprocessor(conn,fullIndexName);
+ WAIT_FOR_INDEX_WRITE.countDown();
+ }
+ // Original rebuilder should have failed
+
+ advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
+ clock.time += WAIT_AFTER_DISABLED * 2;
+ // Enough time has passed, so rebuild will start now
+ TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ clock.time += 100;
+ IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+ } finally {
+ EnvironmentEdgeManager.injectEdge(null);
+ }
+ }
+
@Test
public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
String schemaName = generateUniqueName();
@@ -751,15 +843,58 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
t.start();
}
- public static class DelayingRegionObserver extends SimpleRegionObserver {
- @Override
- public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
- try {
- Thread.sleep(Math.abs(RAND.nextInt()) % 10);
- } catch (InterruptedException e) {
+ private static void addWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
+ int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+ descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), null, priority, null);
+ int numTries = 10;
+ try (HBaseAdmin admin = services.getAdmin()) {
+ admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+ while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+ && numTries > 0) {
+ numTries--;
+ if (numTries == 0) {
+ throw new Exception(
+ "Check to detect if delaying co-processor was added failed after "
+ + numTries + " retries.");
+ }
+ Thread.sleep(1000);
}
-
}
}
+ private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+ descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName());
+ int numTries = 10;
+ try (HBaseAdmin admin = services.getAdmin()) {
+ admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+ while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+ && numTries > 0) {
+ numTries--;
+ if (numTries == 0) {
+ throw new Exception(
+ "Check to detect if delaying co-processor was removed failed after "
+ + numTries + " retries.");
+ }
+ Thread.sleep(1000);
+ }
+ }
+ }
+
+ public static class WriteFailingRegionObserver extends SimpleRegionObserver {
+ @Override
+ public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ WAIT_FOR_REBUILD_TO_START.countDown();
+ try {
+ WAIT_FOR_INDEX_WRITE.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new IOException(e);
+ }
+ }
+ }
+
}
[3/3] phoenix git commit: PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP
and disable index on compaction (addendum 2)
Posted by ja...@apache.org.
PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP and disable index on compaction (addendum 2)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5cf07c4c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5cf07c4c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5cf07c4c
Branch: refs/heads/4.x-HBase-1.2
Commit: 5cf07c4ce64174241e0c311c6f9e1905374aaeca
Parents: aea6106
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Sep 7 11:26:47 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Sep 7 11:34:53 2017 -0700
----------------------------------------------------------------------
.../UngroupedAggregateRegionObserver.java | 62 ++++++++++++++-
.../org/apache/phoenix/hbase/index/Indexer.java | 42 +++++-----
.../stats/DefaultStatisticsCollector.java | 83 ++++++--------------
.../schema/stats/NoOpStatisticsCollector.java | 2 +-
.../schema/stats/StatisticsCollector.java | 2 +-
5 files changed, 111 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 31c83e4..a61f502 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -55,9 +55,12 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -69,11 +72,14 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.execute.TupleProjector;
@@ -89,11 +95,13 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
@@ -899,6 +907,58 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
});
}
+ @Override
+ public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
+ final StoreFile resultFile, CompactionRequest request) throws IOException {
+ // If we're compacting all files, then delete markers are removed
+ // and we must permanently disable an index that needs to be
+ // partially rebuild because we're potentially losing the information
+ // we need to successfully rebuilt it.
+ if (request.isAllFiles() || request.isMajor()) {
+ // Compaction and split upcalls run with the effective user context of the requesting user.
+ // This will lead to failure of cross cluster RPC if the effective user is not
+ // the login user. Switch to the login user context to ensure we have the expected
+ // security context.
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ MutationCode mutationCode = null;
+ long disableIndexTimestamp = 0;
+
+ try (HTableInterface htable = e.getEnvironment().getTable(
+ SchemaUtil.getPhysicalTableName(
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+ e.getEnvironment().getConfiguration()))) {
+ String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+ // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG
+ // Instead, we need to disable all indexes on the view.
+ byte[] tableKey = SchemaUtil.getTableKeyFromFullName(tableName);
+ Get get = new Get(tableKey);
+ get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+ Result result = htable.get(get);
+ if (!result.isEmpty()) {
+ Cell cell = result.listCells().get(0);
+ if (cell.getValueLength() > 0) {
+ disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
+ if (disableIndexTimestamp != 0) {
+ mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
+ }
+ }
+ }
+ } catch (Throwable t) { // log, but swallow exception as we don't want to impact compaction
+ logger.warn("Potential failure to permanently disable index during compaction " + e.getEnvironment().getRegionInfo().getTable().getNameAsString(), t);
+ } finally {
+ if (disableIndexTimestamp != 0 && mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
+ logger.warn("Attempt to permanently disable index " + e.getEnvironment().getRegionInfo().getTable().getNameAsString() +
+ " during compaction" + (mutationCode == null ? "" : " failed with code = " + mutationCode));
+ }
+ }
+ return null;
+ }
+ });
+ }
+ }
+
private static PTable deserializeTable(byte[] b) {
try {
PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
@@ -1115,7 +1175,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
long rowCount = 0;
try {
if (!compactionRunning) {
- stats.init(false);
+ stats.init();
synchronized (innerScanner) {
do {
List<Cell> results = new ArrayList<Cell>();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 8072fba..4273eb1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
@@ -845,21 +847,25 @@ public class Indexer extends BaseRegionObserver {
}
@Override
- public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
- final InternalScanner scanner, final ScanType scanType) throws IOException {
- // Compaction and split upcalls run with the effective user context of the requesting user.
- // This will lead to failure of cross cluster RPC if the effective user is not
- // the login user. Switch to the login user context to ensure we have the expected
- // security context.
- return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
- @Override
- public InternalScanner run() throws Exception {
- InternalScanner internalScanner = scanner;
- if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+ public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+ final StoreFile resultFile, CompactionRequest request) throws IOException {
+ // If we're compacting all files, then delete markers are removed
+ // and we must permanently disable an index that needs to be
+ // partially rebuild because we're potentially losing the information
+ // we need to successfully rebuilt it.
+ if (request.isAllFiles() || request.isMajor()) {
+ // Compaction and split upcalls run with the effective user context of the requesting user.
+ // This will lead to failure of cross cluster RPC if the effective user is not
+ // the login user. Switch to the login user context to ensure we have the expected
+ // security context.
+ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
try {
PhoenixConnection conn = QueryUtil.getConnectionOnServer(c.getEnvironment().getConfiguration()).unwrap(PhoenixConnection.class);
PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+ // FIXME: we may need to recurse into children of this table too
for (PTable index : table.getIndexes()) {
if (index.getIndexDisableTimestamp() != 0) {
try {
@@ -871,15 +877,15 @@ public class Indexer extends BaseRegionObserver {
}
} catch (Exception e) {
// If we can't reach the stats table, don't interrupt the normal
- // compaction operation, just log a warning.
- if (LOG.isWarnEnabled()) {
- LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
- }
+ // compaction operation, just log a warning.
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
+ }
}
+ return null;
}
- return internalScanner;
- }
- });
+ });
+ }
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
index 61a6fa2..b8ba759 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
@@ -42,16 +41,13 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TimeKeeper;
@@ -113,10 +109,9 @@ class DefaultStatisticsCollector implements StatisticsCollector {
}
}
- private void initGuidepostDepth(boolean isMajorCompaction) throws IOException {
+ private void initGuidepostDepth() throws IOException {
// First check is if guidepost info set on statement itself
- boolean guidepostOnStatement = guidePostPerRegionBytes != null || guidePostWidthBytes != null;
- if (guidepostOnStatement) {
+ if (guidePostPerRegionBytes != null || guidePostWidthBytes != null) {
int guidepostPerRegion = 0;
long guidepostWidth = QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES;
if (guidePostPerRegionBytes != null) {
@@ -127,48 +122,20 @@ class DefaultStatisticsCollector implements StatisticsCollector {
}
this.guidePostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
env.getRegion().getTableDesc());
- }
-
- if (!guidepostOnStatement || isMajorCompaction) {
+ } else {
long guidepostWidth = -1;
HTableInterface htable = null;
try {
- // Next check for GUIDE_POST_WIDTH and INDEX_DISABLE_TIMESTAMP on table
- TableName htableName = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration());
- htable = env.getTable(htableName);
+ // Next check for GUIDE_POST_WIDTH on table
+ htable = env.getTable(
+ SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
Get get = new Get(ptableKey);
get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
- get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
Result result = htable.get(get);
if (!result.isEmpty()) {
- Cell gpwCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
- if (gpwCell != null) {
- guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(gpwCell.getValueArray(), gpwCell.getValueOffset(), SortOrder.getDefault());
- }
- if (isMajorCompaction) {
- Cell idtsCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
- if (idtsCell != null) {
- long indexDisableTimestamp = PLong.INSTANCE.getCodec().decodeLong(idtsCell.getValueArray(), idtsCell.getValueOffset(), SortOrder.getDefault());
- // If we have a non zero value for INDEX_DISABLE_TIMESTAMP, that means that our global mutable
- // secondary index needs to be partially rebuilt. If we're compacting, though, we may cleanup
- // the delete markers of an index *before* the puts for the same row occur during replay. At
- // this point the partially index rebuild would leave the index out of sync with the data
- // table. In that case, it's better to just permanently disable the index and force it to be
- // manually rebuilt
- if (indexDisableTimestamp != 0) {
- MutationCode mutationCode = IndexUtil.updateIndexState(ptableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
- if (mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
- LOG.warn("Attempt to permanently disable index " + env.getRegionInfo().getTable().getNameAsString() +
- " during compaction failed with code = " + mutationCode);
- }
- }
- }
- }
+ Cell cell = result.listCells().get(0);
+ guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
}
- } catch (IOException e) {
- throw e;
- } catch (Throwable t) {
- throw new IOException(t);
} finally {
if (htable != null) {
try {
@@ -178,21 +145,19 @@ class DefaultStatisticsCollector implements StatisticsCollector {
}
}
}
- if (!guidepostOnStatement) {
- if (guidepostWidth >= 0) {
- this.guidePostDepth = guidepostWidth;
- } else {
- // Last use global config value
- Configuration config = env.getConfiguration();
- this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
- config.getInt(
- QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
- config.getLong(
- QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
- env.getRegion().getTableDesc());
- }
+ if (guidepostWidth >= 0) {
+ this.guidePostDepth = guidepostWidth;
+ } else {
+ // Last use global config value
+ Configuration config = env.getConfiguration();
+ this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
+ config.getInt(
+ QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
+ config.getLong(
+ QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
+ env.getRegion().getTableDesc());
}
}
}
@@ -350,13 +315,13 @@ class DefaultStatisticsCollector implements StatisticsCollector {
StatisticsScanner scanner = new StatisticsScanner(this, statsWriter, env, internalScan, family);
// We need to initialize the scanner synchronously and potentially perform a cross region Get
// in order to use the correct guide posts width for the table being compacted.
- init(true);
+ init();
return scanner;
}
@Override
- public void init(boolean isMajorCompaction) throws IOException {
- initGuidepostDepth(isMajorCompaction);
+ public void init() throws IOException {
+ initGuidepostDepth();
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
index a13a722..74d1710 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
@@ -61,7 +61,7 @@ public class NoOpStatisticsCollector implements StatisticsCollector {
}
@Override
- public void init(boolean isMajorCompaction) {
+ public void init() {
// No-op
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 9550469..60e83a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -64,7 +64,7 @@ public interface StatisticsCollector extends Closeable {
* Called before beginning the collection of statistics through {@link #collectStatistics(List)}
* @throws IOException
*/
- void init(boolean isMajorCompaction) throws IOException;
+ void init() throws IOException;
/**
* Retrieve the calculated guide post info for the given column family.
[2/3] phoenix git commit: PHOENIX-4175 Convert tests using
CURRENT_SCN to not use it when possible
Posted by ja...@apache.org.
PHOENIX-4175 Convert tests using CURRENT_SCN to not use it when possible
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aea61062
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aea61062
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aea61062
Branch: refs/heads/4.x-HBase-1.2
Commit: aea6106284bbf565a521e4e211b090525dec5129
Parents: 3c5e48d
Author: James Taylor <ja...@apache.org>
Authored: Wed Sep 6 18:05:42 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Sep 7 11:34:35 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/CreateSchemaIT.java | 26 +++----
.../phoenix/end2end/CustomEntityDataIT.java | 75 ++++++++++++--------
.../apache/phoenix/end2end/UpsertSelectIT.java | 42 +++++++++--
3 files changed, 90 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aea61062/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
index 09cd810..fe09dcd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
@@ -30,41 +30,31 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
import org.apache.phoenix.schema.SchemaAlreadyExistsException;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
-public class CreateSchemaIT extends BaseClientManagedTimeIT {
+public class CreateSchemaIT extends ParallelStatsDisabledIT {
@Test
public void testCreateSchema() throws Exception {
- long ts = nextTimestamp();
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
- String ddl = "CREATE SCHEMA TEST_SCHEMA";
+ String schemaName = generateUniqueName();
+ String ddl = "CREATE SCHEMA " + schemaName;
try (Connection conn = DriverManager.getConnection(getUrl(), props);
HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
conn.createStatement().execute(ddl);
- assertNotNull(admin.getNamespaceDescriptor("TEST_SCHEMA"));
+ assertNotNull(admin.getNamespaceDescriptor(schemaName));
}
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
- try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
- conn.createStatement().execute(ddl);
- fail();
- } catch (SchemaAlreadyExistsException e) {
- // expected
- }
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts - 20));
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute(ddl);
fail();
- } catch (NewerSchemaAlreadyExistsException e) {
+ } catch (SchemaAlreadyExistsException e) {
// expected
}
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 50));
Connection conn = DriverManager.getConnection(getUrl(), props);
try {
conn.createStatement().execute("CREATE SCHEMA " + SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aea61062/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
index ad0f308..4af2c5c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.util.TestUtil.CUSTOM_ENTITY_DATA_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.ROW2;
import static org.apache.phoenix.util.TestUtil.ROW5;
import static org.apache.phoenix.util.TestUtil.ROW9;
@@ -32,26 +31,49 @@ import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.util.Properties;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
-public class CustomEntityDataIT extends BaseClientManagedTimeIT {
+public class CustomEntityDataIT extends ParallelStatsDisabledIT {
- protected static void initTableValues(String tenantId, byte[][] splits, long ts) throws Exception {
- ensureTableCreated(getUrl(),CUSTOM_ENTITY_DATA_FULL_NAME,CUSTOM_ENTITY_DATA_FULL_NAME, ts-2);
-
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(url, props);
+ private static void initTableValues(Connection conn, String tenantId, String tableName) throws Exception {
+ String ddl = "create table " + tableName +
+ " (organization_id char(15) not null, \n" +
+ " key_prefix char(3) not null,\n" +
+ " custom_entity_data_id char(12) not null,\n" +
+ " created_by varchar,\n" +
+ " created_date date,\n" +
+ " currency_iso_code char(3),\n" +
+ " deleted char(1),\n" +
+ " division decimal(31,10),\n" +
+ " last_activity date,\n" +
+ " last_update date,\n" +
+ " last_update_by varchar,\n" +
+ " name varchar(240),\n" +
+ " owner varchar,\n" +
+ " record_type_id char(15),\n" +
+ " setup_owner varchar,\n" +
+ " system_modstamp date,\n" +
+ " b.val0 varchar,\n" +
+ " b.val1 varchar,\n" +
+ " b.val2 varchar,\n" +
+ " b.val3 varchar,\n" +
+ " b.val4 varchar,\n" +
+ " b.val5 varchar,\n" +
+ " b.val6 varchar,\n" +
+ " b.val7 varchar,\n" +
+ " b.val8 varchar,\n" +
+ " b.val9 varchar\n" +
+ " CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, custom_entity_data_id)) SPLIT ON ('" + tenantId + "00A','" + tenantId + "00B','" + tenantId + "00C')";
+
+ conn.createStatement().execute(ddl);
// Insert all rows at ts
PreparedStatement stmt = conn.prepareStatement(
- "upsert into " +
- "CORE.CUSTOM_ENTITY_DATA(" +
+ "upsert into " + tableName +
+ "(" +
" ORGANIZATION_ID, " +
" KEY_PREFIX, " +
" CUSTOM_ENTITY_DATA_ID, " +
@@ -154,18 +176,16 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
stmt.execute();
conn.commit();
- conn.close();
}
@Test
public void testUngroupedAggregation() throws Exception {
- long ts = nextTimestamp();
String tenantId = getOrganizationId();
- String query = "SELECT count(1) FROM CORE.CUSTOM_ENTITY_DATA WHERE organization_id=?";
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5
- Connection conn = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ String tableName = generateUniqueName();
+ String query = "SELECT count(1) FROM " + tableName + " WHERE organization_id=?";
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
try {
- initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
+ initTableValues(conn, tenantId, tableName);
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
ResultSet rs = statement.executeQuery();
@@ -179,13 +199,12 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
@Test
public void testScan() throws Exception {
- long ts = nextTimestamp();
String tenantId = getOrganizationId();
- String query = "SELECT CREATED_BY,CREATED_DATE,CURRENCY_ISO_CODE,DELETED,DIVISION,LAST_UPDATE,LAST_UPDATE_BY,NAME,OWNER,SYSTEM_MODSTAMP,VAL0,VAL1,VAL2,VAL3,VAL4,VAL5,VAL6,VAL7,VAL8,VAL9 FROM CORE.CUSTOM_ENTITY_DATA WHERE organization_id=?";
- String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5
- Connection conn = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ String tableName = generateUniqueName();
+ String query = "SELECT CREATED_BY,CREATED_DATE,CURRENCY_ISO_CODE,DELETED,DIVISION,LAST_UPDATE,LAST_UPDATE_BY,NAME,OWNER,SYSTEM_MODSTAMP,VAL0,VAL1,VAL2,VAL3,VAL4,VAL5,VAL6,VAL7,VAL8,VAL9 FROM " + tableName + " WHERE organization_id=?";
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
try {
- initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
+ initTableValues(conn, tenantId, tableName);
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
ResultSet rs = statement.executeQuery();
@@ -203,14 +222,12 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
@Test
public void testWhereStringConcatExpression() throws Exception {
- long ts = nextTimestamp();
String tenantId = getOrganizationId();
- initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
- String query = "SELECT KEY_PREFIX||CUSTOM_ENTITY_DATA_ID FROM CORE.CUSTOM_ENTITY_DATA where '00A'||val0 LIKE '00A2%'";
- Properties props = new Properties();
- props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ String query = "SELECT KEY_PREFIX||CUSTOM_ENTITY_DATA_ID FROM " + tableName + " where '00A'||val0 LIKE '00A2%'";
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
try {
+ initTableValues(conn, tenantId, tableName);
PreparedStatement statement = conn.prepareStatement(query);
ResultSet rs=statement.executeQuery();
assertTrue (rs.next());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/aea61062/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index eb8df18..7fb2751 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -21,7 +21,6 @@ import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.util.TestUtil.A_VALUE;
import static org.apache.phoenix.util.TestUtil.B_VALUE;
-import static org.apache.phoenix.util.TestUtil.CUSTOM_ENTITY_DATA_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.C_VALUE;
import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
import static org.apache.phoenix.util.TestUtil.ROW6;
@@ -100,23 +99,54 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
long ts = nextTimestamp();
String tenantId = getOrganizationId();
byte[][] splits = getDefaultSplits(tenantId);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String aTable = initATableValues(tenantId, saltTable ? null : splits, null, ts-1, getUrl(), saltTable ? "salt_buckets = 2" : null);
String customEntityTable = generateUniqueName();
- ensureTableCreated(getUrl(), customEntityTable, CUSTOM_ENTITY_DATA_FULL_NAME, null, ts-1, saltTable ? "salt_buckets = 2" : null);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts - 1));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "create table " + customEntityTable +
+ " (organization_id char(15) not null, \n" +
+ " key_prefix char(3) not null,\n" +
+ " custom_entity_data_id char(12) not null,\n" +
+ " created_by varchar,\n" +
+ " created_date date,\n" +
+ " currency_iso_code char(3),\n" +
+ " deleted char(1),\n" +
+ " division decimal(31,10),\n" +
+ " last_activity date,\n" +
+ " last_update date,\n" +
+ " last_update_by varchar,\n" +
+ " name varchar(240),\n" +
+ " owner varchar,\n" +
+ " record_type_id char(15),\n" +
+ " setup_owner varchar,\n" +
+ " system_modstamp date,\n" +
+ " b.val0 varchar,\n" +
+ " b.val1 varchar,\n" +
+ " b.val2 varchar,\n" +
+ " b.val3 varchar,\n" +
+ " b.val4 varchar,\n" +
+ " b.val5 varchar,\n" +
+ " b.val6 varchar,\n" +
+ " b.val7 varchar,\n" +
+ " b.val8 varchar,\n" +
+ " b.val9 varchar\n" +
+ " CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, custom_entity_data_id)) " + (saltTable ? "salt_buckets = 2" : "");
+ conn.createStatement().execute(ddl);
+ conn.close();
+
String indexName = generateUniqueName();
if (createIndex) {
- Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); // Execute at timestamp 1
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + aTable + "(a_string)" );
conn.close();
}
PreparedStatement upsertStmt;
- Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(3)); // Trigger multiple batches
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn = DriverManager.getConnection(getUrl(), props);
conn.setAutoCommit(true);
String upsert = "UPSERT INTO " + customEntityTable + "(custom_entity_data_id, key_prefix, organization_id, created_by) " +
"SELECT substr(entity_id, 4), substr(entity_id, 1, 3), organization_id, a_string FROM " + aTable + " WHERE ?=a_string";