You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/02 04:59:40 UTC
[06/50] [abbrv] phoenix git commit: PHOENIX-2478 Rows committed in
transaction overlapping index creation are not populated
PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3520e128
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3520e128
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3520e128
Branch: refs/heads/calcite
Commit: 3520e12858223dfcad343e0a3a29c71fe4d074bc
Parents: 1369937
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Jan 19 08:19:44 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Jan 19 08:19:44 2016 -0800
----------------------------------------------------------------------
.../phoenix/end2end/index/ImmutableIndexIT.java | 239 ++++++++++++++-----
.../apache/phoenix/execute/MutationState.java | 115 +++++----
.../java/org/apache/phoenix/query/BaseTest.java | 103 +++-----
3 files changed, 285 insertions(+), 172 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3520e128/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 0d329fe..c4ecfbb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.end2end.index;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -29,12 +28,26 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.end2end.Shadower;
-import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
@@ -47,80 +60,194 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@RunWith(Parameterized.class)
public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
-
- private final boolean localIndex;
- private final String tableDDLOptions;
- private final String tableName;
+
+ private final boolean localIndex;
+ private final String tableDDLOptions;
+ private final String tableName;
private final String indexName;
private final String fullTableName;
private final String fullIndexName;
-
- public ImmutableIndexIT(boolean localIndex, boolean transactional) {
- this.localIndex = localIndex;
- StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
- if (transactional) {
- optionBuilder.append(", TRANSACTIONAL=true");
- }
- this.tableDDLOptions = optionBuilder.toString();
- this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ? "_TXN" : "");
+
+ private static String TABLE_NAME;
+ private static String INDEX_DDL;
+ public static final AtomicInteger NUM_ROWS = new AtomicInteger(1);
+
+ public ImmutableIndexIT(boolean localIndex, boolean transactional) {
+ this.localIndex = localIndex;
+ StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
+ if (transactional) {
+ optionBuilder.append(", TRANSACTIONAL=true");
+ }
+ this.tableDDLOptions = optionBuilder.toString();
+ this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ? "_TXN" : "");
this.indexName = "IDX" + ( transactional ? "_TXN" : "");
this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
- }
-
- @BeforeClass
+ }
+
+ @BeforeClass
@Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
- props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+ serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName());
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+ clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
-
- @Parameters(name="localIndex = {0} , transactional = {1}")
+
+ @Parameters(name="localIndex = {0} , transactional = {1}")
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] {
- { false, false }, { false, true }, { true, false }, { true, true }
- });
+ { false, true }, { true, true }
+ });
}
-
+
+
@Test
- public void testDropIfImmutableKeyValueColumn() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ public void testCreateIndexDuringUpsertSelect() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(100));
+ TABLE_NAME = fullTableName + "_testCreateIndexDuringUpsertSelect";
+ String ddl ="CREATE TABLE " + TABLE_NAME + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+ INDEX_DDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF NOT EXISTS " + indexName + " ON " + TABLE_NAME
+ + " (long_pk, varchar_pk)"
+ + " INCLUDE (long_col1, long_col2)";
+
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ conn.setAutoCommit(false);
+ Statement stmt = conn.createStatement();
+ stmt.execute(ddl);
+
+ upsertRows(conn, TABLE_NAME, 220);
+ conn.commit();
+
+ // run the upsert select and also create an index
+ conn.setAutoCommit(true);
+ String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) " +
+ "SELECT varchar_pk||'_upsert_select', char_pk, int_pk, long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME;
+ conn.createStatement().execute(upsertSelect);
+
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(440,rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + TABLE_NAME);
+ assertTrue(rs.next());
+ assertEquals(440,rs.getInt(1));
+ }
+ finally {
+ conn.close();
+ }
+ }
+
+ // used to create an index while a batch of rows are being written
+ public static class CreateIndexRegionObserver extends SimpleRegionObserver {
+ @Override
+ public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
+ Put put, WALEdit edit, final Durability durability)
+ throws HBaseIOException {
+ String tableName = c.getEnvironment().getRegion().getRegionInfo()
+ .getTable().getNameAsString();
+ if (tableName.equalsIgnoreCase(TABLE_NAME)
+ // create the index after the second batch of 1000 rows
+ && Bytes.startsWith(put.getRow(), Bytes.toBytes("varchar200_upsert_select"))) {
+ try {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(INDEX_DDL);
+ }
+ } catch (SQLException e) {
+ throw new DoNotRetryIOException(e);
+ }
+ }
+ }
+ }
+
+ private static class UpsertRunnable implements Runnable {
+ private static final int NUM_ROWS_IN_BATCH = 10000;
+ private final String fullTableName;
+
+ public UpsertRunnable(String fullTableName) {
+ this.fullTableName = fullTableName;
+ }
+
+ public void run() {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ while (true) {
+ // write a large batch of rows
+ boolean fistRowInBatch = true;
+ for (int i=0; i<NUM_ROWS_IN_BATCH; ++i) {
+ BaseTest.upsertRow(conn, fullTableName, NUM_ROWS.intValue(), fistRowInBatch);
+ NUM_ROWS.incrementAndGet();
+ fistRowInBatch = false;
+ }
+ conn.commit();
+ Thread.sleep(500);
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Test
+ public void testCreateIndexWhileUpsertingData() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+ String indexDDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF NOT EXISTS " + indexName + " ON " + fullTableName
+ + " (long_pk, varchar_pk)"
+ + " INCLUDE (long_col1, long_col2)";
+ int numThreads = 3;
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.setAutoCommit(false);
- String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
- Statement stmt = conn.createStatement();
- stmt.execute(ddl);
- populateTestTable(fullTableName);
- ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (long_col1)";
- stmt.execute(ddl);
-
- ResultSet rs;
-
- rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
- assertTrue(rs.next());
- assertEquals(3,rs.getInt(1));
- rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
- assertTrue(rs.next());
- assertEquals(3,rs.getInt(1));
-
- conn.setAutoCommit(true);
- String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
- try {
- conn.createStatement().execute(dml);
- fail();
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
- }
-
- conn.createStatement().execute("DROP TABLE " + fullTableName);
+ conn.setAutoCommit(false);
+ Statement stmt = conn.createStatement();
+ stmt.execute(ddl);
+
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+ List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(numThreads);
+ for (int i =0; i<numThreads; ++i) {
+ futureList.add(threadPool.submit(new UpsertRunnable(fullTableName)));
+ }
+ // upsert some rows before creating the index
+ Thread.sleep(5000);
+
+ // create the index
+ try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) {
+ conn2.setAutoCommit(false);
+ Statement stmt2 = conn2.createStatement();
+ stmt2.execute(indexDDL);
+ conn2.commit();
+ }
+
+ // upsert some rows after creating the index
+ Thread.sleep(1000);
+ // cancel the running threads
+ for (Future<?> future : futureList) {
+ future.cancel(true);
+ }
+ threadPool.shutdownNow();
+ threadPool.awaitTermination(30, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ int dataTableRowCount = rs.getInt(1);
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+ assertTrue(rs.next());
+ int indexTableRowCount = rs.getInt(1);
+ assertEquals("Data and Index table should have the same number of rows ", dataTableRowCount, indexTableRowCount);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3520e128/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index a6fe98d..8caac5d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -135,7 +135,7 @@ public class MutationState implements SQLCloseable {
private int numRows = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private boolean isExternalTxContext = false;
- private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations;
+ private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
private final MutationMetricQueue mutationMetricQueue;
private ReadMetricQueue readMetricQueue;
@@ -435,6 +435,59 @@ public class MutationState implements SQLCloseable {
return sizeOffset + numRows;
}
+ private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows,
+ Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+ PTable table = tableRef.getTable();
+ boolean isIndex = table.getType() == PTableType.INDEX;
+ boolean incrementRowCount = dstMutations == this.mutations;
+ Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows);
+ if (existingRows != null) { // Rows for that table already exist
+ // Loop through new rows and replace existing with new
+ for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) {
+ // Replace existing row with new row
+ RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+ if (existingRowMutationState != null) {
+ Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
+ if (existingValues != PRow.DELETE_MARKER) {
+ Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
+ // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row.
+ if (newRow != PRow.DELETE_MARKER) {
+ // Merge existing column values with new column values
+ existingRowMutationState.join(rowEntry.getValue());
+ // Now that the existing row has been merged with the new row, replace it back
+ // again (since it was merged with the new one above).
+ existingRows.put(rowEntry.getKey(), existingRowMutationState);
+ }
+ }
+ } else {
+ if (incrementRowCount && !isIndex) { // Don't count index rows in row count
+ numRows++;
+ }
+ }
+ }
+ // Put the existing one back now that it's merged
+ dstMutations.put(tableRef, existingRows);
+ } else {
+ // Size new map at batch size as that's what it'll likely grow to.
+ Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
+ newRows.putAll(srcRows);
+ dstMutations.put(tableRef, newRows);
+ if (incrementRowCount && !isIndex) {
+ numRows += srcRows.size();
+ }
+ }
+ }
+
+ private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations,
+ Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) {
+ // Merge newMutation with this one, keeping state from newMutation for any overlaps
+ for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) {
+ // Replace existing entries for the table with new entries
+ TableRef tableRef = entry.getKey();
+ Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue();
+ joinMutationState(tableRef, srcRows, dstMutations);
+ }
+ }
/**
* Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence.
* Combine any metrics collected for the newer mutation.
@@ -453,48 +506,12 @@ public class MutationState implements SQLCloseable {
txAwares.addAll(newMutationState.txAwares);
}
this.sizeOffset += newMutationState.sizeOffset;
- // Merge newMutation with this one, keeping state from newMutation for any overlaps
- for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) {
- // Replace existing entries for the table with new entries
- TableRef tableRef = entry.getKey();
- PTable table = tableRef.getTable();
- boolean isIndex = table.getType() == PTableType.INDEX;
- Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue());
- if (existingRows != null) { // Rows for that table already exist
- // Loop through new rows and replace existing with new
- for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
- // Replace existing row with new row
- RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
- if (existingRowMutationState != null) {
- Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
- if (existingValues != PRow.DELETE_MARKER) {
- Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
- // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row.
- if (newRow != PRow.DELETE_MARKER) {
- // Merge existing column values with new column values
- existingRowMutationState.join(rowEntry.getValue());
- // Now that the existing row has been merged with the new row, replace it back
- // again (since it was merged with the new one above).
- existingRows.put(rowEntry.getKey(), existingRowMutationState);
- }
- }
- } else {
- if (!isIndex) { // Don't count index rows in row count
- numRows++;
- }
- }
- }
- // Put the existing one back now that it's merged
- this.mutations.put(entry.getKey(), existingRows);
- } else {
- // Size new map at batch size as that's what it'll likely grow to.
- Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
- newRows.putAll(entry.getValue());
- this.mutations.put(tableRef, newRows);
- if (!isIndex) {
- numRows += entry.getValue().size();
- }
+ joinMutationState(newMutationState.mutations, this.mutations);
+ if (!newMutationState.txMutations.isEmpty()) {
+ if (txMutations.isEmpty()) {
+ txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
}
+ joinMutationState(newMutationState.txMutations, this.txMutations);
}
mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue);
if (readMetricQueue == null) {
@@ -915,6 +932,7 @@ public class MutationState implements SQLCloseable {
long startTime = System.currentTimeMillis();
child.addTimelineAnnotation("Attempt " + retryCount);
hTable.batch(mutationList);
+ if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName));
child.stop();
child.stop();
shouldRetry = false;
@@ -980,13 +998,13 @@ public class MutationState implements SQLCloseable {
// committed in the event of a failure.
if (isTransactional) {
addUncommittedStatementIndexes(valuesMap.values());
- if (txMutations == null) {
+ if (txMutations.isEmpty()) {
txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
}
// Keep all mutations we've encountered until a commit or rollback.
// This is not ideal, but there's not good way to get the values back
// in the event that we need to replay the commit.
- txMutations.put(tableRef, valuesMap);
+ joinMutationState(tableRef, valuesMap, txMutations);
}
// Remove batches as we process them
if (sendAll) {
@@ -1082,7 +1100,7 @@ public class MutationState implements SQLCloseable {
private void resetTransactionalState() {
tx = null;
txAwares.clear();
- txMutations = null;
+ txMutations = Collections.emptyMap();
uncommittedPhysicalNames.clear();
uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
}
@@ -1187,9 +1205,7 @@ public class MutationState implements SQLCloseable {
break;
}
retryCount++;
- if (txMutations != null) {
- mutations.putAll(txMutations);
- }
+ mutations.putAll(txMutations);
} while (true);
}
@@ -1214,6 +1230,7 @@ public class MutationState implements SQLCloseable {
if (result.getTable() == null) {
throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString());
}
+ tableRef.setTable(result.getTable());
if (!result.wasUpdated()) {
if (logger.isInfoEnabled()) logger.info("No updates to " + dataTable.getName().getString() + " as of " + timestamp);
continue;
@@ -1223,7 +1240,7 @@ public class MutationState implements SQLCloseable {
// that an index was dropped and recreated with the same name but different
// indexed/covered columns.
addedIndexes = (!oldIndexes.equals(result.getTable().getIndexes()));
- if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to " + dataTable.getName().getString() + " with indexes " + dataTable.getIndexes());
+ if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to " + dataTable.getName().getString() + " with indexes " + tableRef.getTable().getIndexes());
}
}
if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "to indexes as of " + getInitialWritePointer());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3520e128/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 35bb8ce..951bfce 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.query;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -150,6 +151,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer;
import org.apache.phoenix.hbase.index.master.IndexMasterObserver;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
@@ -200,7 +202,7 @@ import com.google.inject.util.Providers;
public abstract class BaseTest {
protected static final String TEST_TABLE_SCHEMA = "(" +
" varchar_pk VARCHAR NOT NULL, " +
- " char_pk CHAR(6) NOT NULL, " +
+ " char_pk CHAR(10) NOT NULL, " +
" int_pk INTEGER NOT NULL, "+
" long_pk BIGINT NOT NULL, " +
" decimal_pk DECIMAL(31, 10) NOT NULL, " +
@@ -1805,77 +1807,44 @@ public abstract class BaseTest {
public HBaseTestingUtility getUtility() {
return utility;
}
+
+ public static void upsertRows(Connection conn, String fullTableName, int numRows) throws SQLException {
+ for (int i=1; i<=numRows; ++i) {
+ upsertRow(conn, fullTableName, i, false);
+ }
+ }
+
+ public static void upsertRow(Connection conn, String fullTableName, int index, boolean firstRowInBatch) throws SQLException {
+ String upsert = "UPSERT INTO " + fullTableName
+ + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ stmt.setString(1, firstRowInBatch ? "firstRowInBatch_" : "" + "varchar"+index);
+ stmt.setString(2, "char"+index);
+ stmt.setInt(3, index);
+ stmt.setLong(4, index);
+ stmt.setBigDecimal(5, new BigDecimal(index));
+ Date date = DateUtil.parseDate("2015-01-01 00:00:00");
+ stmt.setDate(6, date);
+ stmt.setString(7, "varchar_a");
+ stmt.setString(8, "chara");
+ stmt.setInt(9, index+1);
+ stmt.setLong(10, index+1);
+ stmt.setBigDecimal(11, new BigDecimal(index+1));
+ stmt.setDate(12, date);
+ stmt.setString(13, "varchar_b");
+ stmt.setString(14, "charb");
+ stmt.setInt(15, index+2);
+ stmt.setLong(16, index+2);
+ stmt.setBigDecimal(17, new BigDecimal(index+2));
+ stmt.setDate(18, date);
+ stmt.executeUpdate();
+ }
// Populate the test table with data.
public static void populateTestTable(String fullTableName) throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- String upsert = "UPSERT INTO " + fullTableName
- + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
- PreparedStatement stmt = conn.prepareStatement(upsert);
- stmt.setString(1, "varchar1");
- stmt.setString(2, "char1");
- stmt.setInt(3, 1);
- stmt.setLong(4, 1L);
- stmt.setBigDecimal(5, new BigDecimal(1.0));
- Date date = DateUtil.parseDate("2015-01-01 00:00:00");
- stmt.setDate(6, date);
- stmt.setString(7, "varchar_a");
- stmt.setString(8, "chara");
- stmt.setInt(9, 2);
- stmt.setLong(10, 2L);
- stmt.setBigDecimal(11, new BigDecimal(2.0));
- stmt.setDate(12, date);
- stmt.setString(13, "varchar_b");
- stmt.setString(14, "charb");
- stmt.setInt(15, 3);
- stmt.setLong(16, 3L);
- stmt.setBigDecimal(17, new BigDecimal(3.0));
- stmt.setDate(18, date);
- stmt.executeUpdate();
-
- stmt.setString(1, "varchar2");
- stmt.setString(2, "char2");
- stmt.setInt(3, 2);
- stmt.setLong(4, 2L);
- stmt.setBigDecimal(5, new BigDecimal(2.0));
- date = DateUtil.parseDate("2015-01-02 00:00:00");
- stmt.setDate(6, date);
- stmt.setString(7, "varchar_a");
- stmt.setString(8, "chara");
- stmt.setInt(9, 3);
- stmt.setLong(10, 3L);
- stmt.setBigDecimal(11, new BigDecimal(3.0));
- stmt.setDate(12, date);
- stmt.setString(13, "varchar_b");
- stmt.setString(14, "charb");
- stmt.setInt(15, 4);
- stmt.setLong(16, 4L);
- stmt.setBigDecimal(17, new BigDecimal(4.0));
- stmt.setDate(18, date);
- stmt.executeUpdate();
-
- stmt.setString(1, "varchar3");
- stmt.setString(2, "char3");
- stmt.setInt(3, 3);
- stmt.setLong(4, 3L);
- stmt.setBigDecimal(5, new BigDecimal(3.0));
- date = DateUtil.parseDate("2015-01-03 00:00:00");
- stmt.setDate(6, date);
- stmt.setString(7, "varchar_a");
- stmt.setString(8, "chara");
- stmt.setInt(9, 4);
- stmt.setLong(10, 4L);
- stmt.setBigDecimal(11, new BigDecimal(4.0));
- stmt.setDate(12, date);
- stmt.setString(13, "varchar_b");
- stmt.setString(14, "charb");
- stmt.setInt(15, 5);
- stmt.setLong(16, 5L);
- stmt.setBigDecimal(17, new BigDecimal(5.0));
- stmt.setDate(18, date);
- stmt.executeUpdate();
-
+ upsertRows(conn, fullTableName, 3);
conn.commit();
}
}