You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/07 18:35:54 UTC

[1/3] phoenix git commit: PHOENIX-4173 Ensure that the rebuild fails if an index that transitions back to disabled while rebuilding

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 1fb01af6b -> 5cf07c4ce


PHOENIX-4173 Ensure that the rebuild fails if an index that transitions back to disabled while rebuilding


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3c5e48d9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3c5e48d9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3c5e48d9

Branch: refs/heads/4.x-HBase-1.2
Commit: 3c5e48d9246f44cc39181b9c1cb9b51fb60bdd32
Parents: 1fb01af
Author: James Taylor <ja...@apache.org>
Authored: Wed Sep 6 12:46:34 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Sep 7 11:34:13 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/PartialIndexRebuilderIT.java  | 151 ++++++++++++++++++-
 1 file changed, 143 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c5e48d9/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index cacf0fa..067f50f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
@@ -30,7 +31,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -38,10 +39,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PTable;
@@ -634,6 +638,94 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new CountDownLatch(1);
+    private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new CountDownLatch(1);
+
+    
+    @Test
+    public void testDisableIndexDuringRebuild() throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            clock.time += 100;
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)");
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')");
+            conn.commit();
+            clock.time += 100;
+            try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
+                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
+                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
+                clock.time += 100;
+                long disableTime = clock.currentTime();
+                // Set some values while index disabled
+                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')");
+                conn.commit();
+                clock.time += 100;
+                assertTrue(hasDisabledIndex(metaCache, key));
+                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')");
+                conn.commit();
+                clock.time += 100;
+                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')");
+                conn.commit();
+                clock.time += 100;
+                // Will cause partial index rebuilder to be triggered
+                IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
+                final CountDownLatch doneSignal = new CountDownLatch(1);
+                advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
+                // Set some values while index is in INACTIVE state
+                clock.time += 100;
+                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
+                conn.commit();
+                clock.time += 100;
+                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
+                conn.commit();
+                doneSignal.await(30, TimeUnit.SECONDS);
+                // Install coprocessor that will simulate an index write failure during index rebuild
+                addWriteFailingCoprocessor(conn,fullIndexName);
+                clock.time += WAIT_AFTER_DISABLED;
+                doneSignal.await(30, TimeUnit.SECONDS);
+                WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
+                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
+                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
+	            clock.time += 100;
+	            disableTime = clock.currentTime();
+	            // Set some values while index disabled
+	            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bbbbb', '11','yy')");
+	            conn.commit();
+	            clock.time += 100;
+	            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','cccccc','222','zzz')");
+	            conn.commit();
+	            clock.time += 100;
+	            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ddddddd','3333','zzzz')");
+	            conn.commit();
+	            clock.time += 100;
+	            // Simulates another write failure. Should cause current run of rebuilder to fail and retry again later
+	            IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
+                removeWriteFailingCoprocessor(conn,fullIndexName);
+	            WAIT_FOR_INDEX_WRITE.countDown();
+            }
+            // Original rebuilder should have failed
+            
+            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
+            clock.time += WAIT_AFTER_DISABLED * 2;
+            // Enough time has passed, so rebuild will start now
+            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            clock.time += 100;
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        } finally {
+            EnvironmentEdgeManager.injectEdge(null);
+        }
+    }
+
     @Test
     public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
         String schemaName = generateUniqueName();
@@ -751,15 +843,58 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         t.start();
     }
     
-    public static class DelayingRegionObserver extends SimpleRegionObserver {
-        @Override
-        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            try {
-                Thread.sleep(Math.abs(RAND.nextInt()) % 10);
-            } catch (InterruptedException e) {
+    private static void addWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
+        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+        descriptor.addCoprocessor(WriteFailingRegionObserver.class.getName(), null, priority, null);
+        int numTries = 10;
+        try (HBaseAdmin admin = services.getAdmin()) {
+            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+            while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+                    && numTries > 0) {
+                numTries--;
+                if (numTries == 0) {
+                    throw new Exception(
+                            "Check to detect if delaying co-processor was added failed after "
+                                    + numTries + " retries.");
+                }
+                Thread.sleep(1000);
             }
-            
         }
     }
     
+    private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+        descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName());
+        int numTries = 10;
+        try (HBaseAdmin admin = services.getAdmin()) {
+            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+            while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+                    && numTries > 0) {
+                numTries--;
+                if (numTries == 0) {
+                    throw new Exception(
+                            "Check to detect if delaying co-processor was removed failed after "
+                                    + numTries + " retries.");
+                }
+                Thread.sleep(1000);
+            }
+        }
+    }
+    
+    public static class WriteFailingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        	WAIT_FOR_REBUILD_TO_START.countDown();
+        	try {
+				WAIT_FOR_INDEX_WRITE.await(30, TimeUnit.SECONDS);
+			} catch (InterruptedException e) {
+				Thread.interrupted();
+				throw new IOException(e);
+			}
+        }
+    }
+
 }


[3/3] phoenix git commit: PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP and disable index on compaction (addendum 2)

Posted by ja...@apache.org.
PHOENIX-3953 Clear INDEX_DISABLED_TIMESTAMP and disable index on compaction (addendum 2)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5cf07c4c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5cf07c4c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5cf07c4c

Branch: refs/heads/4.x-HBase-1.2
Commit: 5cf07c4ce64174241e0c311c6f9e1905374aaeca
Parents: aea6106
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Sep 7 11:26:47 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Sep 7 11:34:53 2017 -0700

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 62 ++++++++++++++-
 .../org/apache/phoenix/hbase/index/Indexer.java | 42 +++++-----
 .../stats/DefaultStatisticsCollector.java       | 83 ++++++--------------
 .../schema/stats/NoOpStatisticsCollector.java   |  2 +-
 .../schema/stats/StatisticsCollector.java       |  2 +-
 5 files changed, 111 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 31c83e4..a61f502 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -55,9 +55,12 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -69,11 +72,14 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.execute.TupleProjector;
@@ -89,11 +95,13 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
@@ -899,6 +907,58 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         });
     }
 
+    @Override
+    public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
+            final StoreFile resultFile, CompactionRequest request) throws IOException {
+        // If we're compacting all files, then delete markers are removed
+        // and we must permanently disable an index that needs to be
+        // partially rebuild because we're potentially losing the information
+        // we need to successfully rebuilt it.
+        if (request.isAllFiles() || request.isMajor()) {
+            // Compaction and split upcalls run with the effective user context of the requesting user.
+            // This will lead to failure of cross cluster RPC if the effective user is not
+            // the login user. Switch to the login user context to ensure we have the expected
+            // security context.
+            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    MutationCode mutationCode = null;
+                    long disableIndexTimestamp = 0;
+                    
+                    try (HTableInterface htable = e.getEnvironment().getTable(
+                                SchemaUtil.getPhysicalTableName(
+                                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                                        e.getEnvironment().getConfiguration()))) {
+                        String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+                        // FIXME: if this is an index on a view, we won't find a row for it in SYSTEM.CATALOG
+                        // Instead, we need to disable all indexes on the view.
+                        byte[] tableKey = SchemaUtil.getTableKeyFromFullName(tableName);
+                        Get get = new Get(tableKey);
+                        get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+                        Result result = htable.get(get);
+                        if (!result.isEmpty()) {
+                            Cell cell = result.listCells().get(0);
+                            if (cell.getValueLength() > 0) {
+                                disableIndexTimestamp = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
+                                if (disableIndexTimestamp != 0) {
+                                    mutationCode = IndexUtil.updateIndexState(tableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
+                                }
+                            }
+                        }
+                    } catch (Throwable t) { // log, but swallow exception as we don't want to impact compaction
+                        logger.warn("Potential failure to permanently disable index during compaction " +  e.getEnvironment().getRegionInfo().getTable().getNameAsString(), t);
+                    } finally {
+                        if (disableIndexTimestamp != 0 && mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
+                            logger.warn("Attempt to permanently disable index " + e.getEnvironment().getRegionInfo().getTable().getNameAsString() + 
+                                    " during compaction" + (mutationCode == null ? "" : " failed with code = " + mutationCode));
+                        }
+                    }
+                    return null;
+                }
+            });
+        }
+    }
+
     private static PTable deserializeTable(byte[] b) {
         try {
             PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
@@ -1115,7 +1175,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             long rowCount = 0;
             try {
                 if (!compactionRunning) {
-                    stats.init(false);
+                    stats.init();
                     synchronized (innerScanner) {
                         do {
                             List<Cell> results = new ArrayList<Cell>();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 8072fba..4273eb1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.User;
@@ -845,21 +847,25 @@ public class Indexer extends BaseRegionObserver {
   }
   
   @Override
-  public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
-          final InternalScanner scanner, final ScanType scanType) throws IOException {
-      // Compaction and split upcalls run with the effective user context of the requesting user.
-      // This will lead to failure of cross cluster RPC if the effective user is not
-      // the login user. Switch to the login user context to ensure we have the expected
-      // security context.
-      return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
-          @Override
-          public InternalScanner run() throws Exception {
-              InternalScanner internalScanner = scanner;
-              if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+  public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+          final StoreFile resultFile, CompactionRequest request) throws IOException {
+      // If we're compacting all files, then delete markers are removed
+      // and we must permanently disable an index that needs to be
+      // partially rebuild because we're potentially losing the information
+      // we need to successfully rebuilt it.
+      if (request.isAllFiles() || request.isMajor()) {
+          // Compaction and split upcalls run with the effective user context of the requesting user.
+          // This will lead to failure of cross cluster RPC if the effective user is not
+          // the login user. Switch to the login user context to ensure we have the expected
+          // security context.
+          User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
                   String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
                   try {
                       PhoenixConnection conn =  QueryUtil.getConnectionOnServer(c.getEnvironment().getConfiguration()).unwrap(PhoenixConnection.class);
                       PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
+                      // FIXME: we may need to recurse into children of this table too
                       for (PTable index : table.getIndexes()) {
                           if (index.getIndexDisableTimestamp() != 0) {
                               try {
@@ -871,15 +877,15 @@ public class Indexer extends BaseRegionObserver {
                       }
                   } catch (Exception e) {
                       // If we can't reach the stats table, don't interrupt the normal
-                    // compaction operation, just log a warning.
-                    if (LOG.isWarnEnabled()) {
-                        LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
-                    }
+                      // compaction operation, just log a warning.
+                      if (LOG.isWarnEnabled()) {
+                          LOG.warn("Unable to permanently disable indexes being partially rebuild for " + fullTableName, e);
+                      }
                   }
+                  return null;
               }
-              return internalScanner;
-          }
-      });
+          });
+      }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
index 61a6fa2..b8ba759 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -42,16 +41,13 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TimeKeeper;
@@ -113,10 +109,9 @@ class DefaultStatisticsCollector implements StatisticsCollector {
         }
     }
     
-    private void initGuidepostDepth(boolean isMajorCompaction) throws IOException {
+    private void initGuidepostDepth() throws IOException {
         // First check is if guidepost info set on statement itself
-        boolean guidepostOnStatement = guidePostPerRegionBytes != null || guidePostWidthBytes != null;
-        if (guidepostOnStatement) {
+        if (guidePostPerRegionBytes != null || guidePostWidthBytes != null) {
             int guidepostPerRegion = 0;
             long guidepostWidth = QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES;
             if (guidePostPerRegionBytes != null) {
@@ -127,48 +122,20 @@ class DefaultStatisticsCollector implements StatisticsCollector {
             }
             this.guidePostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
                     env.getRegion().getTableDesc());
-        }
-        
-        if (!guidepostOnStatement || isMajorCompaction) {
+        } else {
             long guidepostWidth = -1;
             HTableInterface htable = null;
             try {
-                // Next check for GUIDE_POST_WIDTH and INDEX_DISABLE_TIMESTAMP on table
-                TableName htableName = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration());
-                htable = env.getTable(htableName);
+                // Next check for GUIDE_POST_WIDTH on table
+                htable = env.getTable(
+                        SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
                 Get get = new Get(ptableKey);
                 get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
-                get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
                 Result result = htable.get(get);
                 if (!result.isEmpty()) {
-                    Cell gpwCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
-                    if (gpwCell != null) {
-                        guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(gpwCell.getValueArray(), gpwCell.getValueOffset(), SortOrder.getDefault());
-                    }
-                    if (isMajorCompaction) {
-                        Cell idtsCell = result.getColumnLatestCell(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
-                        if (idtsCell != null) {
-                            long indexDisableTimestamp = PLong.INSTANCE.getCodec().decodeLong(idtsCell.getValueArray(), idtsCell.getValueOffset(), SortOrder.getDefault());
-                            // If we have a non zero value for INDEX_DISABLE_TIMESTAMP, that means that our global mutable
-                            // secondary index needs to be partially rebuilt. If we're  compacting, though, we may cleanup
-                            // the delete markers of an index *before* the puts for the same row occur during replay. At
-                            // this point the partially index rebuild would leave the index out of sync with the data
-                            // table. In that case, it's better to just permanently disable the index and force it to be
-                            // manually rebuilt
-                            if (indexDisableTimestamp != 0) {
-                                MutationCode mutationCode = IndexUtil.updateIndexState(ptableKey, 0L, htable, PIndexState.DISABLE).getMutationCode();
-                                if (mutationCode != MutationCode.TABLE_ALREADY_EXISTS && mutationCode != MutationCode.TABLE_NOT_FOUND) {
-                                    LOG.warn("Attempt to permanently disable index " + env.getRegionInfo().getTable().getNameAsString() + 
-                                            " during compaction failed with code = " + mutationCode);
-                                }
-                            }
-                        }
-                    }
+                    Cell cell = result.listCells().get(0);
+                    guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
                 }
-            } catch (IOException e) {
-                throw e;
-            } catch (Throwable t) {
-                throw new IOException(t);
             } finally {
                 if (htable != null) {
                     try {
@@ -178,21 +145,19 @@ class DefaultStatisticsCollector implements StatisticsCollector {
                     }
                 }
             }
-            if (!guidepostOnStatement) {
-                if (guidepostWidth >= 0) {
-                    this.guidePostDepth = guidepostWidth;
-                } else {
-                    // Last use global config value
-                    Configuration config = env.getConfiguration();
-                    this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
-                            config.getInt(
-                                QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
-                                QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
-                            config.getLong(
-                                QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                                QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
-                            env.getRegion().getTableDesc());
-                }
+            if (guidepostWidth >= 0) {
+                this.guidePostDepth = guidepostWidth;
+            } else {
+                // Last use global config value
+                Configuration config = env.getConfiguration();
+                this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
+                        config.getInt(
+                            QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+                            QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
+                        config.getLong(
+                            QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                            QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
+                        env.getRegion().getTableDesc());
             }
         }
     }
@@ -350,13 +315,13 @@ class DefaultStatisticsCollector implements StatisticsCollector {
         StatisticsScanner scanner = new StatisticsScanner(this, statsWriter, env, internalScan, family);
         // We need to initialize the scanner synchronously and potentially perform a cross region Get
         // in order to use the correct guide posts width for the table being compacted.
-        init(true);
+        init();
         return scanner;
     }
 
     @Override
-    public void init(boolean isMajorCompaction) throws IOException {
-        initGuidepostDepth(isMajorCompaction);
+    public void init() throws IOException {
+        initGuidepostDepth();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
index a13a722..74d1710 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
@@ -61,7 +61,7 @@ public class NoOpStatisticsCollector implements StatisticsCollector {
     }
 
     @Override 
-    public void init(boolean isMajorCompaction) {
+    public void init() {
         // No-op
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5cf07c4c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 9550469..60e83a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -64,7 +64,7 @@ public interface StatisticsCollector extends Closeable {
      * Called before beginning the collection of statistics through {@link #collectStatistics(List)}
      * @throws IOException 
      */
-    void init(boolean isMajorCompaction) throws IOException;
+    void init() throws IOException;
 
     /**
      * Retrieve the calculated guide post info for the given column family.


[2/3] phoenix git commit: PHOENIX-4175 Convert tests using CURRENT_SCN to not use it when possible

Posted by ja...@apache.org.
PHOENIX-4175 Convert tests using CURRENT_SCN to not use it when possible


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aea61062
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aea61062
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aea61062

Branch: refs/heads/4.x-HBase-1.2
Commit: aea6106284bbf565a521e4e211b090525dec5129
Parents: 3c5e48d
Author: James Taylor <ja...@apache.org>
Authored: Wed Sep 6 18:05:42 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Sep 7 11:34:35 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/CreateSchemaIT.java  | 26 +++----
 .../phoenix/end2end/CustomEntityDataIT.java     | 75 ++++++++++++--------
 .../apache/phoenix/end2end/UpsertSelectIT.java  | 42 +++++++++--
 3 files changed, 90 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/aea61062/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
index 09cd810..fe09dcd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateSchemaIT.java
@@ -30,41 +30,31 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
 import org.apache.phoenix.schema.SchemaAlreadyExistsException;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
-public class CreateSchemaIT extends BaseClientManagedTimeIT {
+public class CreateSchemaIT extends ParallelStatsDisabledIT {
 
     @Test
     public void testCreateSchema() throws Exception {
-        long ts = nextTimestamp();
-        Properties props = new Properties();
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(true));
-        String ddl = "CREATE SCHEMA TEST_SCHEMA";
+        String schemaName = generateUniqueName();
+        String ddl = "CREATE SCHEMA " + schemaName;
         try (Connection conn = DriverManager.getConnection(getUrl(), props);
                 HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
             conn.createStatement().execute(ddl);
-            assertNotNull(admin.getNamespaceDescriptor("TEST_SCHEMA"));
+            assertNotNull(admin.getNamespaceDescriptor(schemaName));
         }
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
-        try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
-            conn.createStatement().execute(ddl);
-            fail();
-        } catch (SchemaAlreadyExistsException e) {
-            // expected
-        }
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts - 20));
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.createStatement().execute(ddl);
             fail();
-        } catch (NewerSchemaAlreadyExistsException e) {
+        } catch (SchemaAlreadyExistsException e) {
             // expected
         }
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 50));
         Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
             conn.createStatement().execute("CREATE SCHEMA " + SchemaUtil.SCHEMA_FOR_DEFAULT_NAMESPACE);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aea61062/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
index ad0f308..4af2c5c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CustomEntityDataIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.end2end;
 
-import static org.apache.phoenix.util.TestUtil.CUSTOM_ENTITY_DATA_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.ROW2;
 import static org.apache.phoenix.util.TestUtil.ROW5;
 import static org.apache.phoenix.util.TestUtil.ROW9;
@@ -32,26 +31,49 @@ import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.util.Properties;
 
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 
 
 
-public class CustomEntityDataIT extends BaseClientManagedTimeIT {
+public class CustomEntityDataIT extends ParallelStatsDisabledIT {
     
-    protected static void initTableValues(String tenantId, byte[][] splits, long ts) throws Exception {
-        ensureTableCreated(getUrl(),CUSTOM_ENTITY_DATA_FULL_NAME,CUSTOM_ENTITY_DATA_FULL_NAME, ts-2);
-            
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(url, props);
+    private static void initTableValues(Connection conn, String tenantId, String tableName) throws Exception {
+        String ddl = "create table " + tableName +
+                "   (organization_id char(15) not null, \n" +
+                "    key_prefix char(3) not null,\n" +
+                "    custom_entity_data_id char(12) not null,\n" +
+                "    created_by varchar,\n" +
+                "    created_date date,\n" +
+                "    currency_iso_code char(3),\n" +
+                "    deleted char(1),\n" +
+                "    division decimal(31,10),\n" +
+                "    last_activity date,\n" +
+                "    last_update date,\n" +
+                "    last_update_by varchar,\n" +
+                "    name varchar(240),\n" +
+                "    owner varchar,\n" +
+                "    record_type_id char(15),\n" +
+                "    setup_owner varchar,\n" +
+                "    system_modstamp date,\n" +
+                "    b.val0 varchar,\n" +
+                "    b.val1 varchar,\n" +
+                "    b.val2 varchar,\n" +
+                "    b.val3 varchar,\n" +
+                "    b.val4 varchar,\n" +
+                "    b.val5 varchar,\n" +
+                "    b.val6 varchar,\n" +
+                "    b.val7 varchar,\n" +
+                "    b.val8 varchar,\n" +
+                "    b.val9 varchar\n" +
+                "    CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, custom_entity_data_id)) SPLIT ON ('" + tenantId + "00A','" + tenantId + "00B','" + tenantId + "00C')";
+
+        conn.createStatement().execute(ddl);
         // Insert all rows at ts
         PreparedStatement stmt = conn.prepareStatement(
-                "upsert into " +
-                "CORE.CUSTOM_ENTITY_DATA(" +
+                "upsert into " + tableName +
+                "(" +
                 "    ORGANIZATION_ID, " +
                 "    KEY_PREFIX, " +
                 "    CUSTOM_ENTITY_DATA_ID, " +
@@ -154,18 +176,16 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
         stmt.execute();
         
         conn.commit();
-        conn.close();
     }    
 
     @Test
     public void testUngroupedAggregation() throws Exception {
-        long ts = nextTimestamp();
         String tenantId = getOrganizationId();
-        String query = "SELECT count(1) FROM CORE.CUSTOM_ENTITY_DATA WHERE organization_id=?";
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5
-        Connection conn = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+        String tableName = generateUniqueName();
+        String query = "SELECT count(1) FROM " + tableName + " WHERE organization_id=?";
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
         try {
-            initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
+            initTableValues(conn, tenantId, tableName);
             PreparedStatement statement = conn.prepareStatement(query);
             statement.setString(1, tenantId);
             ResultSet rs = statement.executeQuery();
@@ -179,13 +199,12 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
     
     @Test
     public void testScan() throws Exception {
-        long ts = nextTimestamp();
         String tenantId = getOrganizationId();
-        String query = "SELECT CREATED_BY,CREATED_DATE,CURRENCY_ISO_CODE,DELETED,DIVISION,LAST_UPDATE,LAST_UPDATE_BY,NAME,OWNER,SYSTEM_MODSTAMP,VAL0,VAL1,VAL2,VAL3,VAL4,VAL5,VAL6,VAL7,VAL8,VAL9 FROM CORE.CUSTOM_ENTITY_DATA WHERE organization_id=?";
-        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5
-        Connection conn = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+        String tableName = generateUniqueName();
+        String query = "SELECT CREATED_BY,CREATED_DATE,CURRENCY_ISO_CODE,DELETED,DIVISION,LAST_UPDATE,LAST_UPDATE_BY,NAME,OWNER,SYSTEM_MODSTAMP,VAL0,VAL1,VAL2,VAL3,VAL4,VAL5,VAL6,VAL7,VAL8,VAL9 FROM " + tableName + " WHERE organization_id=?";
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
         try {
-            initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
+            initTableValues(conn, tenantId, tableName);
             PreparedStatement statement = conn.prepareStatement(query);
             statement.setString(1, tenantId);
             ResultSet rs = statement.executeQuery();
@@ -203,14 +222,12 @@ public class CustomEntityDataIT extends BaseClientManagedTimeIT {
     
     @Test
     public void testWhereStringConcatExpression() throws Exception {
-        long ts = nextTimestamp();
         String tenantId = getOrganizationId();
-        initTableValues(tenantId, getDefaultSplits(getOrganizationId()), ts);
-        String query = "SELECT KEY_PREFIX||CUSTOM_ENTITY_DATA_ID FROM CORE.CUSTOM_ENTITY_DATA where '00A'||val0 LIKE '00A2%'";
-        Properties props = new Properties();
-        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        String query = "SELECT KEY_PREFIX||CUSTOM_ENTITY_DATA_ID FROM " + tableName + " where '00A'||val0 LIKE '00A2%'";
+        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
         try {
+            initTableValues(conn, tenantId, tableName);
             PreparedStatement statement = conn.prepareStatement(query);
             ResultSet rs=statement.executeQuery();
             assertTrue (rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aea61062/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index eb8df18..7fb2751 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -21,7 +21,6 @@ import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.util.TestUtil.A_VALUE;
 import static org.apache.phoenix.util.TestUtil.B_VALUE;
-import static org.apache.phoenix.util.TestUtil.CUSTOM_ENTITY_DATA_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.C_VALUE;
 import static org.apache.phoenix.util.TestUtil.PTSDB_NAME;
 import static org.apache.phoenix.util.TestUtil.ROW6;
@@ -100,23 +99,54 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
         long ts = nextTimestamp();
         String tenantId = getOrganizationId();
         byte[][] splits = getDefaultSplits(tenantId);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         String aTable = initATableValues(tenantId, saltTable ? null : splits, null, ts-1, getUrl(), saltTable ? "salt_buckets = 2" : null);
 
         String customEntityTable = generateUniqueName();
-        ensureTableCreated(getUrl(), customEntityTable, CUSTOM_ENTITY_DATA_FULL_NAME, null, ts-1, saltTable ? "salt_buckets = 2" : null);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts - 1));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "create table " + customEntityTable +
+                "   (organization_id char(15) not null, \n" +
+                "    key_prefix char(3) not null,\n" +
+                "    custom_entity_data_id char(12) not null,\n" +
+                "    created_by varchar,\n" +
+                "    created_date date,\n" +
+                "    currency_iso_code char(3),\n" +
+                "    deleted char(1),\n" +
+                "    division decimal(31,10),\n" +
+                "    last_activity date,\n" +
+                "    last_update date,\n" +
+                "    last_update_by varchar,\n" +
+                "    name varchar(240),\n" +
+                "    owner varchar,\n" +
+                "    record_type_id char(15),\n" +
+                "    setup_owner varchar,\n" +
+                "    system_modstamp date,\n" +
+                "    b.val0 varchar,\n" +
+                "    b.val1 varchar,\n" +
+                "    b.val2 varchar,\n" +
+                "    b.val3 varchar,\n" +
+                "    b.val4 varchar,\n" +
+                "    b.val5 varchar,\n" +
+                "    b.val6 varchar,\n" +
+                "    b.val7 varchar,\n" +
+                "    b.val8 varchar,\n" +
+                "    b.val9 varchar\n" +
+                "    CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, custom_entity_data_id)) " + (saltTable ? "salt_buckets = 2"  : "");
+        conn.createStatement().execute(ddl);
+        conn.close();
+        
         String indexName = generateUniqueName();
         if (createIndex) {
-            Properties props = new Properties();
             props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); // Execute at timestamp 1
-            Connection conn = DriverManager.getConnection(getUrl(), props);
+            conn = DriverManager.getConnection(getUrl(), props);
             conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + aTable + "(a_string)" );
             conn.close();
         }
         PreparedStatement upsertStmt;
-        Properties props = new Properties();
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
         props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(3)); // Trigger multiple batches
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(true);
         String upsert = "UPSERT INTO " + customEntityTable + "(custom_entity_data_id, key_prefix, organization_id, created_by) " +
             "SELECT substr(entity_id, 4), substr(entity_id, 1, 3), organization_id, a_string  FROM " + aTable + " WHERE ?=a_string";